package com.daoshu.socket.notice.impl;

import com.alibaba.fastjson.JSONObject;
import com.daoshu.socket.utils.MD5Util;
import com.daoshu.socket.view.PushMessageIn;
import com.daoshu.socket.config.NoticeTypeEnum;
import com.daoshu.socket.config.WebsocketConfig;
import com.daoshu.socket.config.ZookeeperConfig;
import com.daoshu.socket.notice.ISyncMessageNotice;
import com.daoshu.socket.service.IUserService;
import com.daoshu.socket.zookeeper.ZookeeperClient;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @ClassName: SyncMessageNoticeZookeeper
 * @description: zookeeper 消息广播
 * @author: Allen
 * @create: 2019-06-26 09:09
 **/
@Slf4j
@Component("syncMessageNoticeZookeeper")
public class SyncMessageNoticeZookeeper implements ISyncMessageNotice {

    @Autowired
    ZookeeperConfig zookeeperConfig;

    @Autowired
    IUserService userService;

    @Autowired
    WebsocketConfig websocketConfig;

    private ZkClient zkClient;

    @Override
    public void initializer() {
        log.info("SyncMessageNoticeZookeeper");
        if (!NoticeTypeEnum.ZOOKEEPER.getName().equalsIgnoreCase(websocketConfig.getNoticeType())) {
            return;
        }
        log.info("SyncMessageNoticeZookeeper initializer");
        zkClient = ZookeeperClient.createZkClient();
        initZkNode();
    }

    @Override
    public void sendMessageNotice(PushMessageIn pushMessageIn) {
        String message = JSONObject.toJSONString(pushMessageIn);
        String messageMd5 = MD5Util.encodeByMD5(message);
        log.info("sendMessageNotice==>>>{}, MD5: {}", message, messageMd5);
        zkClient.writeData(zookeeperConfig.getDataChangeNode(), message);
    }

    @Override
    public void monitorMessageNotice(PushMessageIn pushMessageIn) {
        log.info("monitorMessageNotice{}", pushMessageIn);
        userService.sendMessageForUser(pushMessageIn);
    }

    private void initZkNode() {
        if (!zkClient.exists(zookeeperConfig.getDataChangeNode())) {
            String[] paths = zookeeperConfig.getDataChangeNode().split("/");
            StringBuilder stringBuilder = new StringBuilder();
            for (String path : paths) {
                boolean c1 = path == null || path.length() == 0;

                stringBuilder.append("/" + path);
                if (c1 || zkClient.exists(stringBuilder.toString())) {
                    continue;
                }


                zkClient.createPersistent(stringBuilder.toString(), UUID.randomUUID().toString());
            }
        }

        //设置子节点监听
        zkClient.subscribeDataChanges(zookeeperConfig.getDataChangeNode(), new SyncMessageHandler());
    }

    /***
     * 监听节点数据变化
     */
    class SyncMessageHandler implements IZkDataListener {

        @Override
        public void handleDataChange(String s, Object o) throws Exception {
            if (o != null) {
                PushMessageIn in = JSONObject.parseObject(String.valueOf(o), PushMessageIn.class);
                monitorMessageNotice(in);
            }
        }

        @Override
        public void handleDataDeleted(String s) throws Exception {
            log.debug("handleDataDeleted");
        }
    }
}
